package com.cyao.config;

import com.cyao.core.ScheduleJob;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.client.Client;
import net.greghaines.jesque.client.ClientPoolImpl;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.utils.PoolUtils;
import net.greghaines.jesque.worker.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;

import javax.annotation.Resource;
import java.util.Collection;
import java.util.Collections;

import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;
import static net.greghaines.jesque.utils.ResqueConstants.FREQUENCY;
import static net.greghaines.jesque.utils.ResqueConstants.QUEUE;

/**
 * Jesque https://github.com/gresrun/jesque
 *
 * @author Cyao
 */
@Configuration
public class JesqueConfiguration implements CommandLineRunner, DisposableBean {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Bean
    public Config config(RedisProperties redisProperties) {
        ConfigBuilder configBuilder = new ConfigBuilder();
        configBuilder.withHost(redisProperties.getHost());
        configBuilder.withPort(redisProperties.getPort());
        configBuilder.withDatabase(redisProperties.getDatabase());
        configBuilder.withPassword("".equals(redisProperties.getPassword()) ? null : redisProperties.getPassword());
        return configBuilder.build();
    }

    @Bean
    public Client client(Config config) {
        return new ClientPoolImpl(config, PoolUtils.createJedisPool(config));
    }

    @Bean
    public Jedis jedis(Config config) {
        return new Jedis(config.getHost(), config.getPort(), config.getTimeout());
    }

    @Bean
    public Worker worker(Config config, Jedis jedis) {
        Collection<String> queues = Collections.singletonList(ScheduleJob.QUEUE_NAME);
        JobFactory jobFactory = new MapBasedJobFactory(map(entry("ScheduleJob", ScheduleJob.class)));
        return new WorkerImpl(config, queues, jobFactory, jedis, NextQueueStrategy.DRAIN_WHILE_MESSAGES_EXISTS);
    }

    @Resource
    private Worker worker;

    @Resource
    private Jedis jedis;

    private Thread workerThread;

    @Override
    public void run(String... args) {
        //重置循环任务
        resetRecurringJobs();

        workerThread = new Thread(worker);
        workerThread.start();
    }

    /**
     * 项目启动时重置循环任务，防止启动后执行多次
     */
    private void resetRecurringJobs() {
        final String queueKey = JesqueUtils.createKey(ConfigBuilder.DEFAULT_NAMESPACE, QUEUE, ScheduleJob.QUEUE_NAME);
        final String freqKey = JesqueUtils.createKey(ConfigBuilder.DEFAULT_NAMESPACE, QUEUE, ScheduleJob.QUEUE_NAME, FREQUENCY);

        String script = "local queueKey = KEYS[1]\n" +
                "local freqKey = KEYS[2]\n" +
                "local now = ARGV[1]\n" +
                "local ok, queueType = next(redis.call('TYPE', queueKey))\n" +
                "if queueType == 'zset' then\n" +
                "\tlocal zsetMembers = redis.call('zrange', queueKey, '0', '-1')\n" +
                "\tfor k,member in pairs(zsetMembers) do \n" +
                "\t\tlocal score = redis.call('ZSCORE', queueKey, member)\n" +
                "\t\tlocal frequency = redis.call('HGET', freqKey, member)\n" +
                "\t\tif frequency ~= nil and score < now then\n" +
                "\t\t\t\tredis.call('zadd', queueKey, now, member)\n" +
                "\t\tend\n" +
                "\tend\n" +
                "end\n" +
                "return nil";

        String resetQueueScriptHash = jedis.scriptLoad(script);

        jedis.evalsha(resetQueueScriptHash, 2, queueKey, freqKey, Long.toString(System.currentTimeMillis()));
    }

    @Override
    public void destroy() throws Exception {
        logger.info("------------destroy worker start------------");

        worker.end(false);

        workerThread.join();

        logger.info("------------destroy worker end------------");
    }
}
